package streaming.func.udf;

import streaming.api.beans.SensorReading;
import streaming.func.model.AvgTemp;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 聚合函数(Aggregate Functions)
 */
public class UDFTest3_aggregate {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStream<String> inputStream = env.readTextFile("D:\\IdeaProjects\\springboot-flink-1\\flinkTutorial\\src\\main\\resources\\sensor.txt");
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0],new Long(fields[1]), new Double(fields[2]));
        });
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");

        // 4. 自定义聚合函数，求当前传感器的平均温度值
        AvgTemp avgTemp = new AvgTemp();
        // 需要在环境中注册UDF
        tableEnv.registerFunction("avgTemp", avgTemp);
        // 4.1 table API
        Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");
        // 4.2 SQL
        tableEnv.createTemporaryView("sensor", sensorTable);
        Table resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");

        // 打印输出
        tableEnv.toRetractStream(resultTable, Row.class).print("result");
        //tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");

        env.execute();

    }
}
